package org.example.mq;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong;

/**
 *
 * 使用生产者/消费者模型的任务调度线程池，可以通过简单的配置，约束线程的消费速度，减少对系统的压力
 * @author steveGuRen
 */
public class DispatchThreadPool {

    private final static transient Logger logger = LoggerFactory.getLogger(DispatchThreadPool.class);

    private String poolName = "调度";

    public DispatchThreadPool(String poolName) {
        this.poolName = poolName;
    }

    /**
     * 任务执行线程池
     */
    private ThreadPoolExecutor threadPool = null;

    /**
     * 任务队列
     */
    private BlockingQueue<Runnable> transactionQuene = new ArrayBlockingQueue(DispatchThreadConfig.MAX_QUEUE);

    /**
     * 任务调度线程池
     */
    private ScheduledThreadPoolExecutor scheduledDispatch = new ScheduledThreadPoolExecutor(2);

    /**
     * 等待通知模型异步通知线程
     */
    private ThreadPoolExecutor single =  new ThreadPoolExecutor(1, 1, 0, TimeUnit.SECONDS,  new ArrayBlockingQueue(DispatchThreadConfig.MAX_QUEUE));

    /**
     * 是否启动了任务队列，延迟初始化调度队列
     */
    private volatile Boolean isStart = false;

    private volatile Long lastSendEmailTime = 0L;

    private volatile Boolean isSendEmail = false;

    private volatile Boolean isWait = false;

    /**
     * 轮询的update的相关业务放到线程池，统一调度，约束最大并行数量
     * @param runnable
     */
    public void execute(Runnable runnable) {
        // 将任务放到队列
        boolean isMax = transactionQuene.offer(runnable);
        //如果启用了等待通知模型，则通过等待/通知模型通知调度线程进行调度
        if(DispatchThreadConfig.USE_WAIT_NOTIFY.equals(DispatchThreadConfig.IS_USE_WAIT_NOTIFY)) {
            if(isWait) {
                //异步通知调度线程进行调度
                single.execute(()-> {
                    synchronized (transactionQuene) {
                        if(isWait) {
                            transactionQuene.notifyAll();
                            isWait = false;
                        }
                    }
                });
            }

        }
        try {
            logger.debug(Thread.currentThread().getName() + "当前队列数量" + transactionQuene.size());
            if(!isMax) {
                //队列满了，检查是否要发送邮件，是则发送一次
                if (DispatchThreadConfig.IS_NEED_SEND_EMAIL.equals(DispatchThreadConfig.NEED_SEND_EMAIL)) {
                    Long currentTime = System.currentTimeMillis();
                    double cost = (currentTime - lastSendEmailTime) / 1000;
                    if(cost > 60) {
                        if(!this.isSendEmail) {
                            //确保队列满时60S内只发送一次邮件
                            synchronized (this.isSendEmail) {
                                if(!this.isSendEmail) {
                                    //每隔60S且队列满的时候才发送邮件
                                    this.lastSendEmailTime = currentTime;
                                    logger.debug("线程池" + this.poolName  + "队列满了，后续入队数据将会被丢弃;" + "目前队列数量最大为" + DispatchThreadConfig.MAX_QUEUE);
                                    //此处可以发送友建等信息
                                    this.isSendEmail = true;
                                } else {
                                    //60秒内已经发送过邮件，不再发送
                                }
                            }

                        }
                    }
                }
            } else {
                //队列没满，将发送邮件标识为没发送
                this.isSendEmail = false;
            }
            if(!this.isStart) {
                //如果没有启动，开始启动调度线程
                synchronized (this.isStart) {
                    if(!isStart) {
                        threadPool = new ThreadPoolExecutor(DispatchThreadConfig.CORE_THREAD, DispatchThreadConfig.MAX_THREAD, 0, TimeUnit.SECONDS,  new ArrayBlockingQueue(DispatchThreadConfig.MAX_QUEUE));
                        scheduledDispatch.scheduleAtFixedRate(
                                new DispatchThreadPool.dispatchThread(this.poolName)
                        , 0, DispatchThreadConfig.PER_HANDLE_PERIOD, TimeUnit.MILLISECONDS);
                        isStart = true;
                    }
                }
            } else {
                //已经启动了调度队列和执行线程
            }
        } catch (Exception e) {
            logger.debug("线程放进队列错误", e);
        }
    }


    /**
     * 调度线程，从队列获取一定数量的任务到工作线程池执行，每次最多调度LiveCountConfig.PER_HANDLE_NUM 个任务
     */
     class dispatchThread implements Runnable {

        private AtomicLong j = new AtomicLong();

        /**
         * 线程名称
         */
        private String name = "";

        public dispatchThread(String name) {
            if(name != null) {
                this.name = name;
            }
        }

        public void run() {
            //如果启用了等待通知模型，则主动在队列为空时进行等待
            if(DispatchThreadConfig.USE_WAIT_NOTIFY.equals(DispatchThreadConfig.IS_USE_WAIT_NOTIFY)) {
                    //队列为空，等待
                    if(transactionQuene.isEmpty()) {
                        synchronized (transactionQuene) {
                            try {
                                if(transactionQuene.isEmpty()) {
                                    logger.debug("当前调度线程池：" + name + ": 系统线程名称：" + Thread.currentThread().getName() + "：当前完成队列为空，等待，不再执行");
                                    isWait = true;
                                    transactionQuene.wait();
                                }
                            } catch (InterruptedException e) {
                               logger.debug("调度线程调度错误", e);
                            }
                        }
                    }
            }
            logger.debug("当前调度线程池:" + name + ": 第" + j.getAndAdd(1L) + "次执行" );
            try {
                int i = 0;
                Runnable runnable = null;
                for(; i < DispatchThreadConfig.PER_HANDLE_NUM; i++) {
                    runnable = transactionQuene.poll();
                    if(runnable == null) {
                        break;
                    } else {
                        threadPool.execute(runnable);
                    }
                }
                logger.debug("当前调度线程池：" + name + ": 系统线程名称：" + Thread.currentThread().getName() + "：当前完成调度数量" + i);
            } catch (Exception e) {
                logger.debug("调度线程调度错误", e);
            }
        }

    }

}
